往期精选
Fastjson 远程代码执行漏洞来袭,autotype 开关限制可被绕过
Windows 系统中关于网络常见的 9 个命令,非常实用!
https://github.com/alibaba/canal
Eevnt {
Header {
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳]
schemaName [数据库实例]
tableName [表名]
eventType [insert/update/delete类型]
}
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组]
afterColumns [Column类型的数组]
}
Column {
index [column序号]
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的值]
}
2)单Canal instance,单DTS数据订阅通道均只支持订阅一个RDS,提供给一个消费者;
server:代表一个canal运行实例,对应一个jvm
instance:对应于一个数据队列,一个instance订阅单个Mysql并支撑单个消费者,一个server内可运行多个instance
eventParser:数据源接入,模拟slave协议和master进行交互,协议解析
eventSink:Parser和Store链接器,进行数据过滤,加工,分发的工作
eventStore:数据存储模块
metaManager:客户端消费位点管理模块
Put : Sink模块进行数据存储的最后一次写入位置
Get : 数据订阅获取的最后一次提取位置
Ack : 数据消费成功的最后一次消费位置
拉取事件 -> 消费 -> 消费成功后ACK
// 创建CanalConnector, 连接到localhost:11111
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
connector.connect(); // 连接
connector.subscribe(); // 开始订阅binlog
// 开始循环拉取
while (running) {
Message message = connector.getWithoutAck(1024); // 获取指定数量的数据
long batchId = message.getId();
for (Entry entry : message.getEntries()){
// 对每条消息进行处理
}
connector.ack(batchId); // ack
}
DTS的客户端代码也类似(当然SDK提供的方法名和类名全都不一样),可以看到得益于Client的高度封装,相当易用。
总体而言,canal作为一个被广泛验证和使用的数据订阅与同步组件,是符合我们绝大部分的使用场景的。且得益于活跃的社区,目前还支持了admin控制台与不同类型目标数据库的导入,是值得我们深入学习并用于生产环境的。
作者:阿丸笔记
www.toutiao.com/i6831327080966259212/
往期精选
Fastjson 远程代码执行漏洞来袭,autotype 开关限制可被绕过
Windows 系统中关于网络常见的 9 个命令,非常实用!